Auditing this file for SQL backend readiness reveals several critical issues and opportunities for improvement:
# NOTE: DELETE before INSERT removed - unnecessary since RunID is unique per run
# Each pipeline execution generates a new RunID, so no duplicate data exists
Issue: The commented-out upsert logic suggests historical uncertainty about data freshness guarantees. While the comment claims "RunID is unique per run," this needs verification:
UNIQUE constraint on (RunID, EquipID, Timestamp) in SQL schemaROW_NUMBER() window functionsself._sql_health_cache: Tuple[float, bool] = (0.0, False)
self._sql_health_cache_duration = sql_health_cache_seconds
Issues:
stats['sql_failures'] without alertingRecommendation:
def _check_sql_health(self) -> bool:
"""Check SQL with circuit breaker pattern."""
now = time.time()
last_check, last_result = self._sql_health_cache
# Exponential backoff on repeated failures
if not last_result:
backoff = min(300, self._sql_health_cache_duration * (2 ** self.stats['sql_failures']))
if now - last_check < backoff:
return False
# Standard cache check
if now - last_check < self._sql_health_cache_duration:
return last_result
# Perform health check with transaction test
try:
cur = self.sql_client.cursor()
cur.execute("BEGIN TRANSACTION; SELECT 1; COMMIT;")
cur.fetchone()
self._sql_health_cache = (now, True)
self.stats['sql_failures'] = 0 # Reset on success
return True
except Exception as e:
self._sql_health_cache = (now, False)
self.stats['sql_failures'] += 1
Console.error(f"[OUTPUT] SQL health check failed (attempt {self.stats['sql_failures']}): {e}")
return False
finally:
try:
if 'cur' in locals():
cur.close()
except:
pass
if hasattr(self.sql_client, "commit"):
self.sql_client.commit()
elif hasattr(self.sql_client, "conn") and hasattr(self.sql_client.conn, "commit"):
if not getattr(self.sql_client.conn, "autocommit", True):
self.sql_client.conn.commit()
Issues:
autocommit=True is default (SQL Server default is autocommit=False for pyodbc)Recommendation:
def _commit_transaction(self):
"""Commit with verification."""
try:
# Try direct commit first
if hasattr(self.sql_client, "commit"):
self.sql_client.commit()
return
# Fallback to conn.commit
if hasattr(self.sql_client, "conn"):
conn = self.sql_client.conn
# Force explicit commit regardless of autocommit mode
if hasattr(conn, "commit"):
conn.commit()
return
# Verify commit succeeded by checking @@TRANCOUNT
cur = self.sql_client.cursor()
cur.execute("SELECT @@TRANCOUNT")
trancount = cur.fetchone()[0]
if trancount > 0:
raise RuntimeError(f"Transaction not committed (@@TRANCOUNT={trancount})")
except Exception as e:
Console.error(f"[OUTPUT] Commit failed: {e}")
raise
# Replace extreme float values BEFORE replacing NaN
for col in df_clean.columns:
if df_clean[col].dtype in [np.float64, np.float32]:
extreme_mask = valid_mask & (df_clean[col].abs() > 1e100)
if extreme_mask.any():
df_clean.loc[extreme_mask, col] = None
Issues:
1e100 not documented (SQL Server FLOAT max is ~1.79e308)NULL but doesn't track which rows were affectedRecommendation:
extreme_float_threshold (default 1e100)if extreme_mask.any():
extreme_rows = df_clean[extreme_mask][[col]].head(5)
Console.warn(f"[SQL] Replaced {extreme_mask.sum()} extreme values in {table_name}.{col}: {extreme_rows.to_dict()}")
# only insert columns that actually exist in the table
columns = [c for c in df.columns if c in table_cols]
Issues:
RunID, EquipID)Recommendation:
# Validate critical columns before filtering
CRITICAL_COLUMNS = {'RunID', 'EquipID'}
missing_critical = CRITICAL_COLUMNS - set(df.columns)
if missing_critical:
raise ValueError(f"[SQL] Missing critical columns for {table_name}: {missing_critical}")
# Filter to schema columns with warning for dropped columns
columns = [c for c in df.columns if c in table_cols]
dropped = set(df.columns) - set(columns)
if dropped:
Console.warn(f"[SQL] Dropping {len(dropped)} columns not in {table_name} schema: {dropped}")
insert_sql = f"INSERT INTO dbo.[{table_name}] ({cols_str}) VALUES ({placeholders})"
Issues:
table_name comes from ALLOWED_TABLES set (Lines 21-35) - GOODRecommendation:
# Validate column names match SQL identifier rules
import re
SQL_IDENTIFIER_PATTERN = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
for col in columns:
if not SQL_IDENTIFIER_PATTERN.match(col):
raise ValueError(f"[SQL] Invalid column name for {table_name}: {col}")
# Proceed with bracketed names
cols_str = ", ".join(f"[{c}]" for c in columns)
for i in range(0, len(records), self.batch_size):
batch = records[i:i+self.batch_size]
try:
cur.executemany(insert_sql, batch)
Current: Using executemany() with default batch size of 5000
Recommendations:
Use Table-Valued Parameters (TVP) for SQL Server (10-100x faster):
# Create TVP type in SQL Server
CREATE TYPE dbo.ACM_Scores_TVP AS TABLE (
Timestamp DATETIME2,
RunID VARCHAR(50),
EquipID INT,
DetectorType VARCHAR(50),
ZScore FLOAT
);
# Python code
from pyodbc import TVP
tvp_data = TVP(table_type_name='dbo.ACM_Scores_TVP', rows=records)
cur.execute("EXEC dbo.usp_BulkInsert_Scores @data=?", tvp_data)
Use BCP (Bulk Copy Program) for very large datasets (>100K rows):
# Write to temp CSV, then BULK INSERT
temp_csv = f"/tmp/{table_name}_{uuid.uuid4()}.csv"
df_clean.to_csv(temp_csv, index=False, header=False)
cur.execute(f"""
BULK INSERT dbo.[{table_name}]
FROM '{temp_csv}'
WITH (FORMAT = 'CSV', FIRSTROW = 1)
""")
os.remove(temp_csv)
Tune fast_executemany batch size per table:
# Different tables have different optimal batch sizes
BATCH_SIZES = {
'ACM_Scores_Long': 10000, # Narrow schema, small rows
'ACM_Scores_Wide': 1000, # Wide schema, large rows
'ACM_Episodes': 500 # Complex validation, smaller batches
}
batch_size = BATCH_SIZES.get(table_name, self.batch_size)
def __init__(self, sql_client=None, ...):
self.sql_client = sql_client
Current: Single connection passed in from caller
Recommendation: Implement connection pooling for parallel writes:
from queue import Queue
import threading
class OutputManager:
def __init__(self, sql_client=None, pool_size: int = 5, ...):
self.sql_client = sql_client
self._connection_pool = Queue(maxsize=pool_size)
# Pre-populate pool
if sql_client:
for _ in range(pool_size):
conn = sql_client.clone_connection() # Implement this
self._connection_pool.put(conn)
@contextmanager
def _get_connection(self):
"""Get connection from pool with timeout."""
conn = self._connection_pool.get(timeout=30)
try:
yield conn
finally:
self._connection_pool.put(conn)
def _bulk_insert_sql(self, table_name, df):
"""Use pooled connection."""
with self._get_connection() as conn:
cur = conn.cursor()
# ... existing logic
self._table_exists_cache: Dict[str, bool] = {}
self._table_columns_cache: Dict[str, set] = {}
self._table_insertable_cache: Dict[str, set] = {}
Issues:
Recommendation:
from collections import OrderedDict
from time import time
class LRUCache:
"""LRU cache with TTL."""
def __init__(self, max_size=100, ttl_seconds=300):
self.cache = OrderedDict()
self.max_size = max_size
self.ttl = ttl_seconds
def get(self, key):
if key not in self.cache:
return None
value, timestamp = self.cache[key]
if time() - timestamp > self.ttl:
del self.cache[key]
return None
self.cache.move_to_end(key)
return value
def set(self, key, value):
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = (value, time())
# Use in constructor
self._table_schema_cache = LRUCache(max_size=50, ttl_seconds=300)
def _to_naive(ts) -> Optional[pd.Timestamp]:
"""Convert to timezone-naive local timestamp or None."""
if ts is None or (isinstance(ts, float) and pd.isna(ts)):
return None
try:
result = pd.to_datetime(ts, errors='coerce')
# Strip timezone info if present
if hasattr(result, 'tz') and result.tz is not None:
return result.tz_localize(None)
return result
except Exception:
return None
Issues:
DATETIME2 should store UTC, not local timeRecommendation:
def _to_utc_naive(ts) -> Optional[pd.Timestamp]:
"""Convert to timezone-naive UTC timestamp for SQL Server.
SQL Server best practice: Store all timestamps as UTC in DATETIME2,
convert to local time at query/display time.
"""
if ts is None or (isinstance(ts, float) and pd.isna(ts)):
return None
try:
result = pd.to_datetime(ts, errors='coerce', utc=True)
if result is pd.NaT:
return None
# Convert to UTC then strip tz (SQL Server doesn't support tz-aware)
if hasattr(result, 'tz') and result.tz is not None:
result = result.tz_convert('UTC').tz_localize(None)
return result
except Exception:
return None
Schema Change Required:
-- Add computed column for local time display
ALTER TABLE dbo.ACM_HealthTimeline
ADD TimestampLocal AS DATEADD(HOUR, DATEDIFF(HOUR, GETUTCDATE(), GETDATE()), Timestamp);
-- Create index on UTC column for temporal queries
CREATE INDEX IX_HealthTimeline_Timestamp ON dbo.ACM_HealthTimeline(Timestamp);
OUT-17 pattern is excellentNaN/Inf before SQL writeUNIQUE constraint on (RunID, EquipID, Timestamp) in all time-series tables_check_sql_health()Blocking Issues: 2 (autocommit assumption, timestamp tz handling)
Critical Issues: 4 (health check, schema validation, float handling, commit verification)
Optimization Gaps: 4 (connection pooling, TVP, caching, indexing)
Recommendation: Address blocking issues before production deployment. This file is 70% SQL-ready but needs hardening for enterprise SQL Server environments.